Airflow-провайдеры
В dataCraft Core есть два провайдера для Apache Airflow:
apache_airflow_providers_datacraft_airbyteПакет airflow_providers_datacraft_airbyte позволяет вызывать различные методы API Airbyte как задачи в Airflow. Оператор AirbyteGeneralOperator дает возможность выполнить произвольный вызов API. Остальные операторы вызывают конкретные методы, но при этом в отличие от API дают возможность работать не с ID, а с именами объектов. Например, для создания Airbyte Source требуется указать ID Airbyte Workspace. В операторе вместо этого можно указать его имя. Чтобы преобразовать имя в ID, потребуется передать дополнительный аргумент, который получается с помощью другого оператора (в данном случае AirbyteListWorkspaceOperator. Если имя уникальное, оператор сам подставит в вызов API нужный ID, если нет — вернет ошибку.
На момент написания, есть две версии API Airbyte:
По операторах по возможности используется официальное API, те части, которые там еще не реализованы, сделаны с помощью устаревшего API.
Провайдер содержит:
Список DAG’ов:
| Название | Рещаемая задача |
|---|---|
| create_connections | Создаёт Airbyte Connection |
| install_connectors | Устанавливает коннекторы в Airbyte на основе данных из конфига connectors. |
| generate_models | Создает файлы моделей dbt, заполненные содержимым по умолчанию, т.е. вызовом макросов по каждому шагу методологии dataCraft Core |
| template_configs | DAG используется для генерации конфигов типа templated_file |
| sync_data | Синхронизирует соединения в Airbyte и запускает модели dbt |
Интеграция с dataCraft
apache_airflow_providers_datacraft_dagsВсе DAG’и проекта dataCraft Core имеют общую структуру для того, чтобы пользователю легче было построить собственную логику на основе имеющихся “строительных блоков”.
Во всех DAG’ах есть параметр namespace, который определяет:
get_configs(...) определяются все конфиги, релевантные для него.Все шаги далее представляют собой задачу или группу задач в Airflow.
2. Шаг before, на котором DAG выполняет подготовительную работу, например, создает необходимые папки (см. документацию по конкретному DAG’у).
3. Шаг prepare, на котором DAG определяет список для итерации. Он определяет содержимое всех задач на следующем этапе, например, соединения Airbyte, для которых нужно включить синхронизацию. Пользователь шага prepare и перед следующим шагом iterate может добавить задачу, которая будет модифицировать этот список.
4. Шаг iterate, на котором на каждый элемент списка для итерации создается динамическая задача или динамическая группа задач.
5. Шаг after, на котором DAG очищает или освобождает выделенные ресурсы, например, удаляет созданные временные папки и файлы.
В среде с установленным Airflow выполнить:
pip install apache_airflow_datacraft_dags_provider
Данная команда установит оба пакета. Если нужен только функционал, связанных с Airbyte, то нужно установить пакет apache_airflow_datacraft_dags_provider
В Airflow создать DAG со следующим содержимым:
from apache_airflow_providers_datacraft import DagBuilder
DagBuilder.create_dags() # Создаем все DAGи с параметрами по умолчанию
Пример с кастомизацией:
from apache_airflow_providers_datacraft import DagBuilder
dag = DagBuilder.prepare_dag("generate_models")
dag.schedule_interval = "@weekly" # Изменяем расписание запуска на 1 раз в неделю
def delete_normalize(prepared_tasks):
del prepared_tasks['1_silos']['normalize']
dag.add_prepare_hook(delete_normalize) # Удаляем задачи, связанные со слоем normalize
apache_airflow_providers_datacraft_defaultsПозволяет получать значения конфигов в тех случаях, когда пользователь не задал никаких значений. Например, при первоначальной настройке Airbyte требуется установить коннекторы. За это отвечает DAG install_connectors, для которого требуется указать пути к образам коннекторов и их документации. Если пользователь этого не сделал, можно взять список коннекторов по умолчанию, который лежит в файле connectors.yaml данного пакета. Его содержимое в виде словаря можно получить с помощью функции get_datacraft_defaults.
Для обратной совместимости пакет предусматривает хранение старых версий конфигов. Они имеют такое название, но с суффиксом версии, например connectors_v1.1.yaml. Этот суффикс передается при вызове get_datacraft_defaults.
Иногда конфиг по умолчанию зависит от переменной, например, названия проекта. В этом случае в пакете этот конфиг содержится в виде шаблона Jinja, например:
...
entities:
{% if feature_has_metrika %}
- YandexClientId
{% endif %}
- AccountId
...
Переменные для подстановки в шаблон передаются как аргумент в get_datacraft_defaults в виде словаря, например {"feature_has_metrika": true}.
from apache_airflow_providers_datacraft_defaults import get_datacraft_defaults
get_datacraft_defaults('connectors', 'yaml', '')
Функция get_datacraft_defaults принимает три аргумента:
config_name — название конфига, значение по умолчанию для которого нужно найтиformat — формат, json или yamlsuffix — суффикс для выбора нужной версии (по умолчанию "")template_variables — словарь с переменными для подстановки в шаблон (по умолчанию пустой).